package com.isunimp.sample.spark;

import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;

/**
 * SparkLauncherMain class
 *
 * @author renguiquan
 * @date 2019/3/13
 */
public class SparkLauncherMain {
    public static void main(String args[]) throws IOException {
        fun2(args);
    }

    static void fun2(String args[]) throws IOException {
        SparkAppHandle handler = new SparkLauncher()
                .setAppName("hello-world")
                .setSparkHome("/data/spark-2.4.0-bin-hadoop2.7")
                .setMaster("yarn")
                .setDeployMode("cluster")
                .setAppResource("/root/sample-spark-1.0.jar")
                .setMainClass("com.isunimp.sample.spark.MySQL")
                .addAppArgs("aaaaa")
                .setConf("spark.driver.memory", "2g")
                .setConf("spark.executor.memory", "1g")
                .setConf("spark.executor.cores", "3")
                .startApplication(new SparkAppHandle.Listener() {
                    @Override
                    public void stateChanged(SparkAppHandle handle) {
                        System.out.println("**********  state  changed  **********");
                    }

                    @Override
                    public void infoChanged(SparkAppHandle handle) {
                        System.out.println("**********  info  changed  **********");
                    }
                });

        while (!"FINISHED".equalsIgnoreCase(handler.getState().toString()) && !"FAILED".equalsIgnoreCase(handler.getState().toString())) {
            System.out.println("id    " + handler.getAppId());
            System.out.println("state " + handler.getState());

            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static void fun1() {
        System.setProperty("HADOOP_USER_NAME", "hdfs");
        HashMap<String, String> map = new HashMap<String, String>();
        map.put("HADOOP_CONF_DIR", "/etc/hadoop/conf");
        map.put("YARN_CONF_DIR", "/etc/hadoop/conf");
        map.put("SPARK_CONF_DIR", "/etc/spark2/conf");
        map.put("SPARK_HOME", "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2");
        map.put("JAVA_HOME", "/usr/java/jdk1.8.0_144");
        try {
            SparkLauncher spark = new SparkLauncher(map)
                    .setDeployMode("client")
                    .setAppResource("hdfs:///user/jars/spark_module-1.0-SNAPSHOT.jar")
                    .setMainClass("com.sinosoft.Table_count")
                    .setMaster("yarn-client")
                    .setConf(SparkLauncher.DRIVER_MEMORY, "1g")
                    .setConf(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/jars/")
                    .setConf(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, "/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/jars/")
                    .setVerbose(true);
            // 启动spark任务
            Process process = spark.launch();
            InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");
            Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
            inputThread.start();

            InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");
            Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
            errorThread.start();

            System.out.println("Waiting for finish...");
            int exitCode = process.waitFor();
            System.out.println("Finished! Exit code:" + exitCode);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class InputStreamReaderRunnable implements Runnable {
        private BufferedReader reader;
        private String name;

        public InputStreamReaderRunnable(InputStream is, String name) {
            this.reader = new BufferedReader(new InputStreamReader(is));
            this.name = name;
        }

        public void run() {
            System.out.println("InputStream " + name + ":");
            try {
                String line = reader.readLine();
                while (line != null) {
                    System.out.println(line);
                    line = reader.readLine();
                }
                reader.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
